package cn.com.tranzvision.realtime.etl.app

import cn.com.tranzvision.realtime.etl.process.{CartDataETL, ClickLogDataETL, CommentsDataETL, GoodsDataETL, OrderDataETL, OrderDetailDataETL, SyncDimData}
import cn.com.tranzvision.realtime.etl.utils.GlobalConfigUtil
import org.apache.flink.api.common.restartstrategy.RestartStrategies
import org.apache.flink.runtime.state.filesystem.FsStateBackend
import org.apache.flink.streaming.api.CheckpointingMode
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala._

/**
 * ETL驱动类
 * @date : 2021/7/21 10:01
 */
object App {
  //入口函数
  def main(args: Array[String]): Unit = {
    //1. 初始化Flink运行环境
    val env =  StreamExecutionEnvironment.getExecutionEnvironment
    //2. 将Flink默认的开发环境并行度设置为1
    env.setParallelism(1)
    //3. 配置Checkpoint,每隔5S运行一次checkponit
    env.enableCheckpointing(5000)
    //当作业被cancel的时候，保留以前的checkpoint，避免数据的丢失
    env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
    //设置同一个时间只能有一个检查点，检查点的操作是否可以并行，1不能并行
    env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
    env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
    // checkpoint的HDFS保存位置
    env.setStateBackend(new FsStateBackend("hdfs://k8s01:9000/flink/checkpoint/"))
    // 配置两次checkpoint的最小时间间隔
    env.getCheckpointConfig.setMinPauseBetweenCheckpoints(1000)
    // 配置checkpoint的超时时长
    env.getCheckpointConfig.setCheckpointTimeout(60000)

    //指定重启策略，默认的是不停的重启
    //程序出现异常的时候，会进行重启，重启五次，每次延迟5秒钟，如果超过了五次，程序退出
    env.setRestartStrategy(RestartStrategies.fixedDelayRestart(5, 5000))

    //使用分布式缓存将ip地址资源库数据拷贝到TaskManager节点上
    env.registerCachedFile(GlobalConfigUtil.`ip.file.path`, "qqwry.dat")

    //1. 维度数据增量更新到redis中
    val syncDataProcess = new SyncDimData(env)
    syncDataProcess.process()

    //2. 点击流日志的实时ETL
    val clickLogProcess: ClickLogDataETL = new ClickLogDataETL(env)
    clickLogProcess.process()

    //3. 订单数据实时ETL
    val orderProcess = new OrderDataETL(env);
    orderProcess.process()

    //4. 订单明细实时ETL
    val orderDetailProcess = new OrderDetailDataETL(env)
    orderDetailProcess.process()

    //5.商品数据实时ETL
    var goodsDataProcess = new GoodsDataETL(env)
    goodsDataProcess.process()

    //6：购物车数据的实时ETL
    val cartDataProcess: CartDataETL = new CartDataETL(env)
    cartDataProcess.process()

    //7.评论数据实时ETL
    val commentsDataProcess = new CommentsDataETL(env)
    commentsDataProcess.process();

    //执行任务
    env.execute()
  }
}
